package com.gdlt.mq2db;

import java.util.List;

import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
 * Created by CM on 2017/3/14.
 *
 */
public class Runner {
    //@SuppressWarnings("unused")
    public static void Main(String[] args) {
        // read partitionid and config from args
        int partitionId = Integer.parseInt(args[0]);
        String configFile = args[1];

        Kafka2pgConfig.AddConfig(configFile);
        //Kafka2pgConfig conf = new Kafka2pgConfig(configFile);

        final PGWriter pgWriter = new PGWriter();
        long offset = pgWriter.getLastoffset();

        final KafkaReader reader = new KafkaReader(partitionId, offset);

        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                System.out.println("Shutdown hook ran!");
                pgWriter.close();
                reader.close();
            }
        });

        while (true) {
            List<ConsumerRecord<String, String>> data = reader.getData();
            offset = data.get(data.size() - 1).offset();
            pgWriter.write(data, offset);
        }
    }

}
